package rmqlib

import (
	"github.com/streadway/amqp"
	"log"
	"strings"
)

type MQ struct {
	Channel       *amqp.Channel
	notifyConfirm chan amqp.Confirmation
	notifyReturn  chan amqp.Return
}

func NewMQ() *MQ {
	c, err := GetConn().Channel()
	if err != nil {
		log.Println(err)
		return nil
	}
	return &MQ{
		Channel: c,
	}
}

func (this *MQ) SendMessage(key string, exchange string, message string) error {
	return this.Channel.Publish(exchange, key, true, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(message),
	})
}

/**
SendDelayMessage 发送延迟消息
*/
func (this *MQ) SendDelayMessage(key string, exchange string, message string, delay int) error {
	return this.Channel.Publish(exchange, key, true, false, amqp.Publishing{
		Headers:     map[string]interface{}{"x-delay": delay},
		ContentType: "text/plain",
		Body:        []byte(message),
	})
}

func (this *MQ) SendFanoutMessage(exchange string, message string) error {
	return this.Channel.Publish(exchange, "", true, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(message),
	})
}

func (this *MQ) DecQueueAndBind(queues string, key string, exchange string) error {
	qList := strings.Split(queues, ",")
	for _, queue := range qList {
		q, err := this.Channel.QueueDeclare(queue, false, false, false, false, nil)
		if err != nil {
			return err
		}
		//绑定交换机
		err = this.Channel.QueueBind(q.Name, key, exchange, false, nil)
		if err != nil {
			return err
		}
	}
	return nil
}

//设置notify模式，侦听消息有没有正确入列
func (this *MQ) NotifyReturn() {
	this.notifyReturn = this.Channel.NotifyReturn(make(chan amqp.Return))
	this.listenReturn()
}

//return回调
func (this *MQ) listenReturn() {
	ret := <-this.notifyReturn
	if string(ret.Body) != "" {
		//进行日志记录
		log.Println("消息没有正确入列：", string(ret.Body))
	} else {
		log.Println("消息正确入列", string(ret.Body))
	}
}

//confirm模式，消息有没有正确入交换机
func (this *MQ) SetConfirm() {
	err := this.Channel.Confirm(false)
	if err != nil {
		log.Fatal(err)
	}
	//设置回调通道
	this.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))
}

//cofirm模式的回调处理
func (this *MQ) ListenConfirm() {
	ret := <-this.notifyConfirm
	if ret.Ack {
		log.Println("消息发送成功")
	} else {
		log.Println("消息发送失败")
	}
}

//消费
func (this *MQ) Consume(queue string, key string, callback func(<-chan amqp.Delivery, string, map[string]interface{}), config map[string]interface{}) {
	msgs, err := this.Channel.Consume(queue, key, false, false, false, false, nil)
	if err != nil {
		log.Fatal(err)
	}
	callback(msgs, key, config)
}
